package com.xj.hadoop.hive;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.util.StringTokenizer;

public class MyInputFormat extends FileInputFormat<Text, Text> {
	
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	@Override
	public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader(InputSplit arg0,
			TaskAttemptContext arg1) throws IOException, InterruptedException {
		return new MyRecordReader();
	}
	
	static class MyRecordReader  extends RecordReader<Text, Text> {
		public LineReader in;
        public Text lineKey;
        public Text lineValue;
        public StringTokenizer token=null;       
 
        public Text line;
		@Override
		public void close() throws IOException {
			in.close();
		}

		@Override
		public Text getCurrentKey() throws IOException, InterruptedException {
			return lineKey;
		}

		@Override
		public Text getCurrentValue() throws IOException, InterruptedException {
			return lineValue;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			return 0;
		}

		@Override
		public void initialize(InputSplit input, TaskAttemptContext context) throws IOException,
				InterruptedException {
			FileSplit split=(FileSplit)input;
            Configuration job=context.getConfiguration();
            Path file=split.getPath();
            FileSystem fs=file.getFileSystem(job);
            FSDataInputStream filein=fs.open(file);
            in=new LineReader(filein,job);
            line=new Text();
            lineKey=new Text();
            lineValue=new Text();
			
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			int linesize=in.readLine(line);
			if(linesize==0){
				return false;
			}
			System.out.println("myinputformat: "+line.toString());
			token=new StringTokenizer(line.toString());
			String []temp=new String[2];
			if(token.hasMoreElements()){
                temp[0]=token.nextToken();
                if(token.hasMoreElements()){
                    temp[1]=token.nextToken();
                }
            }
			String []points=temp[1].split(" ");
			lineKey.set(temp[0]);
			for (int i = 0; i < points.length; i++) {
				lineValue.set(points[i]);
			}
			return false;
		}
	}

	public static void main(String[] args) {
		String str="st sg gd";
		System.out.println(str.split(" ").length);
	}

}
